• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![warn(rust_2018_idioms)]
2 #![cfg(all(feature = "full", tokio_unstable, not(target_os = "wasi")))]
3 
4 use std::future::Future;
5 use std::sync::{Arc, Mutex};
6 use std::task::Poll;
7 use tokio::macros::support::poll_fn;
8 
9 use tokio::runtime::Runtime;
10 use tokio::task::consume_budget;
11 use tokio::time::{self, Duration};
12 
13 #[test]
num_workers()14 fn num_workers() {
15     let rt = current_thread();
16     assert_eq!(1, rt.metrics().num_workers());
17 
18     let rt = threaded();
19     assert_eq!(2, rt.metrics().num_workers());
20 }
21 
22 #[test]
num_blocking_threads()23 fn num_blocking_threads() {
24     let rt = current_thread();
25     assert_eq!(0, rt.metrics().num_blocking_threads());
26     let _ = rt.block_on(rt.spawn_blocking(move || {}));
27     assert_eq!(1, rt.metrics().num_blocking_threads());
28 }
29 
30 #[test]
num_idle_blocking_threads()31 fn num_idle_blocking_threads() {
32     let rt = current_thread();
33     assert_eq!(0, rt.metrics().num_idle_blocking_threads());
34     let _ = rt.block_on(rt.spawn_blocking(move || {}));
35     rt.block_on(async {
36         time::sleep(Duration::from_millis(5)).await;
37     });
38 
39     // We need to wait until the blocking thread has become idle. Usually 5ms is
40     // enough for this to happen, but not always. When it isn't enough, sleep
41     // for another second. We don't always wait for a whole second since we want
42     // the test suite to finish quickly.
43     //
44     // Note that the timeout for idle threads to be killed is 10 seconds.
45     if 0 == rt.metrics().num_idle_blocking_threads() {
46         rt.block_on(async {
47             time::sleep(Duration::from_secs(1)).await;
48         });
49     }
50 
51     assert_eq!(1, rt.metrics().num_idle_blocking_threads());
52 }
53 
54 #[test]
blocking_queue_depth()55 fn blocking_queue_depth() {
56     let rt = tokio::runtime::Builder::new_current_thread()
57         .enable_all()
58         .max_blocking_threads(1)
59         .build()
60         .unwrap();
61 
62     assert_eq!(0, rt.metrics().blocking_queue_depth());
63 
64     let ready = Arc::new(Mutex::new(()));
65     let guard = ready.lock().unwrap();
66 
67     let ready_cloned = ready.clone();
68     let wait_until_ready = move || {
69         let _unused = ready_cloned.lock().unwrap();
70     };
71 
72     let h1 = rt.spawn_blocking(wait_until_ready.clone());
73     let h2 = rt.spawn_blocking(wait_until_ready);
74     assert!(rt.metrics().blocking_queue_depth() > 0);
75 
76     drop(guard);
77 
78     let _ = rt.block_on(h1);
79     let _ = rt.block_on(h2);
80 
81     assert_eq!(0, rt.metrics().blocking_queue_depth());
82 }
83 
84 #[test]
active_tasks_count()85 fn active_tasks_count() {
86     let rt = current_thread();
87     let metrics = rt.metrics();
88     assert_eq!(0, metrics.active_tasks_count());
89     rt.spawn(async move {
90         assert_eq!(1, metrics.active_tasks_count());
91     });
92 
93     let rt = threaded();
94     let metrics = rt.metrics();
95     assert_eq!(0, metrics.active_tasks_count());
96     rt.spawn(async move {
97         assert_eq!(1, metrics.active_tasks_count());
98     });
99 }
100 
101 #[test]
remote_schedule_count()102 fn remote_schedule_count() {
103     use std::thread;
104 
105     let rt = current_thread();
106     let handle = rt.handle().clone();
107     let task = thread::spawn(move || {
108         handle.spawn(async {
109             // DO nothing
110         })
111     })
112     .join()
113     .unwrap();
114 
115     rt.block_on(task).unwrap();
116 
117     assert_eq!(1, rt.metrics().remote_schedule_count());
118 
119     let rt = threaded();
120     let handle = rt.handle().clone();
121     let task = thread::spawn(move || {
122         handle.spawn(async {
123             // DO nothing
124         })
125     })
126     .join()
127     .unwrap();
128 
129     rt.block_on(task).unwrap();
130 
131     assert_eq!(1, rt.metrics().remote_schedule_count());
132 }
133 
134 #[test]
worker_park_count()135 fn worker_park_count() {
136     let rt = current_thread();
137     let metrics = rt.metrics();
138     rt.block_on(async {
139         time::sleep(Duration::from_millis(1)).await;
140     });
141     drop(rt);
142     assert!(1 <= metrics.worker_park_count(0));
143 
144     let rt = threaded();
145     let metrics = rt.metrics();
146     rt.block_on(async {
147         time::sleep(Duration::from_millis(1)).await;
148     });
149     drop(rt);
150     assert!(1 <= metrics.worker_park_count(0));
151     assert!(1 <= metrics.worker_park_count(1));
152 }
153 
154 #[test]
worker_noop_count()155 fn worker_noop_count() {
156     // There isn't really a great way to generate no-op parks as they happen as
157     // false-positive events under concurrency.
158 
159     let rt = current_thread();
160     let metrics = rt.metrics();
161     rt.block_on(async {
162         time::sleep(Duration::from_millis(1)).await;
163     });
164     drop(rt);
165     assert!(0 < metrics.worker_noop_count(0));
166 
167     let rt = threaded();
168     let metrics = rt.metrics();
169     rt.block_on(async {
170         time::sleep(Duration::from_millis(1)).await;
171     });
172     drop(rt);
173     assert!(0 < metrics.worker_noop_count(0));
174     assert!(0 < metrics.worker_noop_count(1));
175 }
176 
177 #[test]
worker_steal_count()178 fn worker_steal_count() {
179     // This metric only applies to the multi-threaded runtime.
180     //
181     // We use a blocking channel to backup one worker thread.
182     use std::sync::mpsc::channel;
183 
184     let rt = threaded();
185     let metrics = rt.metrics();
186 
187     rt.block_on(async {
188         let (tx, rx) = channel();
189 
190         // Move to the runtime.
191         tokio::spawn(async move {
192             // Spawn the task that sends to the channel
193             tokio::spawn(async move {
194                 tx.send(()).unwrap();
195             });
196 
197             // Spawn a task that bumps the previous task out of the "next
198             // scheduled" slot.
199             tokio::spawn(async {});
200 
201             // Blocking receive on the channel.
202             rx.recv().unwrap();
203         })
204         .await
205         .unwrap();
206     });
207 
208     drop(rt);
209 
210     let n: u64 = (0..metrics.num_workers())
211         .map(|i| metrics.worker_steal_count(i))
212         .sum();
213 
214     assert_eq!(1, n);
215 }
216 
217 #[test]
worker_poll_count_and_time()218 fn worker_poll_count_and_time() {
219     const N: u64 = 5;
220 
221     async fn task() {
222         // Sync sleep
223         std::thread::sleep(std::time::Duration::from_micros(10));
224     }
225 
226     let rt = current_thread();
227     let metrics = rt.metrics();
228     rt.block_on(async {
229         for _ in 0..N {
230             tokio::spawn(task()).await.unwrap();
231         }
232     });
233     drop(rt);
234     assert_eq!(N, metrics.worker_poll_count(0));
235     // Not currently supported for current-thread runtime
236     assert_eq!(Duration::default(), metrics.worker_mean_poll_time(0));
237 
238     // Does not populate the histogram
239     assert!(!metrics.poll_count_histogram_enabled());
240     for i in 0..10 {
241         assert_eq!(0, metrics.poll_count_histogram_bucket_count(0, i));
242     }
243 
244     let rt = threaded();
245     let metrics = rt.metrics();
246     rt.block_on(async {
247         for _ in 0..N {
248             tokio::spawn(task()).await.unwrap();
249         }
250     });
251     drop(rt);
252     // Account for the `block_on` task
253     let n = (0..metrics.num_workers())
254         .map(|i| metrics.worker_poll_count(i))
255         .sum();
256 
257     assert_eq!(N, n);
258 
259     let n: Duration = (0..metrics.num_workers())
260         .map(|i| metrics.worker_mean_poll_time(i))
261         .sum();
262 
263     assert!(n > Duration::default());
264 
265     // Does not populate the histogram
266     assert!(!metrics.poll_count_histogram_enabled());
267     for n in 0..metrics.num_workers() {
268         for i in 0..10 {
269             assert_eq!(0, metrics.poll_count_histogram_bucket_count(n, i));
270         }
271     }
272 }
273 
274 #[test]
worker_poll_count_histogram()275 fn worker_poll_count_histogram() {
276     const N: u64 = 5;
277 
278     let rts = [
279         tokio::runtime::Builder::new_current_thread()
280             .enable_all()
281             .enable_metrics_poll_count_histogram()
282             .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear)
283             .metrics_poll_count_histogram_buckets(3)
284             .metrics_poll_count_histogram_resolution(Duration::from_millis(50))
285             .build()
286             .unwrap(),
287         tokio::runtime::Builder::new_multi_thread()
288             .worker_threads(2)
289             .enable_all()
290             .enable_metrics_poll_count_histogram()
291             .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear)
292             .metrics_poll_count_histogram_buckets(3)
293             .metrics_poll_count_histogram_resolution(Duration::from_millis(50))
294             .build()
295             .unwrap(),
296     ];
297 
298     for rt in rts {
299         let metrics = rt.metrics();
300         rt.block_on(async {
301             for _ in 0..N {
302                 tokio::spawn(async {}).await.unwrap();
303             }
304         });
305         drop(rt);
306 
307         let num_workers = metrics.num_workers();
308         let num_buckets = metrics.poll_count_histogram_num_buckets();
309 
310         assert!(metrics.poll_count_histogram_enabled());
311         assert_eq!(num_buckets, 3);
312 
313         let n = (0..num_workers)
314             .flat_map(|i| (0..num_buckets).map(move |j| (i, j)))
315             .map(|(worker, bucket)| metrics.poll_count_histogram_bucket_count(worker, bucket))
316             .sum();
317         assert_eq!(N, n);
318     }
319 }
320 
321 #[test]
worker_poll_count_histogram_range()322 fn worker_poll_count_histogram_range() {
323     let max = Duration::from_nanos(u64::MAX);
324 
325     let rt = tokio::runtime::Builder::new_current_thread()
326         .enable_all()
327         .enable_metrics_poll_count_histogram()
328         .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear)
329         .metrics_poll_count_histogram_buckets(3)
330         .metrics_poll_count_histogram_resolution(us(50))
331         .build()
332         .unwrap();
333     let metrics = rt.metrics();
334 
335     assert_eq!(metrics.poll_count_histogram_bucket_range(0), us(0)..us(50));
336     assert_eq!(
337         metrics.poll_count_histogram_bucket_range(1),
338         us(50)..us(100)
339     );
340     assert_eq!(metrics.poll_count_histogram_bucket_range(2), us(100)..max);
341 
342     let rt = tokio::runtime::Builder::new_current_thread()
343         .enable_all()
344         .enable_metrics_poll_count_histogram()
345         .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Log)
346         .metrics_poll_count_histogram_buckets(3)
347         .metrics_poll_count_histogram_resolution(us(50))
348         .build()
349         .unwrap();
350     let metrics = rt.metrics();
351 
352     let a = Duration::from_nanos(50000_u64.next_power_of_two());
353     let b = a * 2;
354 
355     assert_eq!(metrics.poll_count_histogram_bucket_range(0), us(0)..a);
356     assert_eq!(metrics.poll_count_histogram_bucket_range(1), a..b);
357     assert_eq!(metrics.poll_count_histogram_bucket_range(2), b..max);
358 }
359 
360 #[test]
worker_poll_count_histogram_disabled_without_explicit_enable()361 fn worker_poll_count_histogram_disabled_without_explicit_enable() {
362     let rts = [
363         tokio::runtime::Builder::new_current_thread()
364             .enable_all()
365             .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear)
366             .metrics_poll_count_histogram_buckets(3)
367             .metrics_poll_count_histogram_resolution(Duration::from_millis(50))
368             .build()
369             .unwrap(),
370         tokio::runtime::Builder::new_multi_thread()
371             .worker_threads(2)
372             .enable_all()
373             .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear)
374             .metrics_poll_count_histogram_buckets(3)
375             .metrics_poll_count_histogram_resolution(Duration::from_millis(50))
376             .build()
377             .unwrap(),
378     ];
379 
380     for rt in rts {
381         let metrics = rt.metrics();
382         assert!(!metrics.poll_count_histogram_enabled());
383     }
384 }
385 
386 #[test]
worker_total_busy_duration()387 fn worker_total_busy_duration() {
388     const N: usize = 5;
389 
390     let zero = Duration::from_millis(0);
391 
392     let rt = current_thread();
393     let metrics = rt.metrics();
394 
395     rt.block_on(async {
396         for _ in 0..N {
397             tokio::spawn(async {
398                 tokio::task::yield_now().await;
399             })
400             .await
401             .unwrap();
402         }
403     });
404 
405     drop(rt);
406 
407     assert!(zero < metrics.worker_total_busy_duration(0));
408 
409     let rt = threaded();
410     let metrics = rt.metrics();
411 
412     rt.block_on(async {
413         for _ in 0..N {
414             tokio::spawn(async {
415                 tokio::task::yield_now().await;
416             })
417             .await
418             .unwrap();
419         }
420     });
421 
422     drop(rt);
423 
424     for i in 0..metrics.num_workers() {
425         assert!(zero < metrics.worker_total_busy_duration(i));
426     }
427 }
428 
429 #[test]
worker_local_schedule_count()430 fn worker_local_schedule_count() {
431     let rt = current_thread();
432     let metrics = rt.metrics();
433     rt.block_on(async {
434         tokio::spawn(async {}).await.unwrap();
435     });
436     drop(rt);
437 
438     assert_eq!(1, metrics.worker_local_schedule_count(0));
439     assert_eq!(0, metrics.remote_schedule_count());
440 
441     let rt = threaded();
442     let metrics = rt.metrics();
443     rt.block_on(async {
444         // Move to the runtime
445         tokio::spawn(async {
446             tokio::spawn(async {}).await.unwrap();
447         })
448         .await
449         .unwrap();
450     });
451     drop(rt);
452 
453     let n: u64 = (0..metrics.num_workers())
454         .map(|i| metrics.worker_local_schedule_count(i))
455         .sum();
456 
457     assert_eq!(2, n);
458     assert_eq!(1, metrics.remote_schedule_count());
459 }
460 
461 #[test]
worker_overflow_count()462 fn worker_overflow_count() {
463     // Only applies to the threaded worker
464     let rt = threaded();
465     let metrics = rt.metrics();
466     rt.block_on(async {
467         // Move to the runtime
468         tokio::spawn(async {
469             let (tx1, rx1) = std::sync::mpsc::channel();
470             let (tx2, rx2) = std::sync::mpsc::channel();
471 
472             // First, we need to block the other worker until all tasks have
473             // been spawned.
474             tokio::spawn(async move {
475                 tx1.send(()).unwrap();
476                 rx2.recv().unwrap();
477             });
478 
479             // Bump the next-run spawn
480             tokio::spawn(async {});
481 
482             rx1.recv().unwrap();
483 
484             // Spawn many tasks
485             for _ in 0..300 {
486                 tokio::spawn(async {});
487             }
488 
489             tx2.send(()).unwrap();
490         })
491         .await
492         .unwrap();
493     });
494     drop(rt);
495 
496     let n: u64 = (0..metrics.num_workers())
497         .map(|i| metrics.worker_overflow_count(i))
498         .sum();
499 
500     assert_eq!(1, n);
501 }
502 
503 #[test]
injection_queue_depth()504 fn injection_queue_depth() {
505     use std::thread;
506 
507     let rt = current_thread();
508     let handle = rt.handle().clone();
509     let metrics = rt.metrics();
510 
511     thread::spawn(move || {
512         handle.spawn(async {});
513     })
514     .join()
515     .unwrap();
516 
517     assert_eq!(1, metrics.injection_queue_depth());
518 
519     let rt = threaded();
520     let handle = rt.handle().clone();
521     let metrics = rt.metrics();
522 
523     // First we need to block the runtime workers
524     let (tx1, rx1) = std::sync::mpsc::channel();
525     let (tx2, rx2) = std::sync::mpsc::channel();
526     let (tx3, rx3) = std::sync::mpsc::channel();
527     let rx3 = Arc::new(Mutex::new(rx3));
528 
529     rt.spawn(async move { rx1.recv().unwrap() });
530     rt.spawn(async move { rx2.recv().unwrap() });
531 
532     // Spawn some more to make sure there are items
533     for _ in 0..10 {
534         let rx = rx3.clone();
535         rt.spawn(async move {
536             rx.lock().unwrap().recv().unwrap();
537         });
538     }
539 
540     thread::spawn(move || {
541         handle.spawn(async {});
542     })
543     .join()
544     .unwrap();
545 
546     let n = metrics.injection_queue_depth();
547     assert!(1 <= n, "{}", n);
548     assert!(15 >= n, "{}", n);
549 
550     for _ in 0..10 {
551         tx3.send(()).unwrap();
552     }
553 
554     tx1.send(()).unwrap();
555     tx2.send(()).unwrap();
556 }
557 
558 #[test]
worker_local_queue_depth()559 fn worker_local_queue_depth() {
560     const N: usize = 100;
561 
562     let rt = current_thread();
563     let metrics = rt.metrics();
564     rt.block_on(async {
565         for _ in 0..N {
566             tokio::spawn(async {});
567         }
568 
569         assert_eq!(N, metrics.worker_local_queue_depth(0));
570     });
571 
572     let rt = threaded();
573     let metrics = rt.metrics();
574     rt.block_on(async move {
575         // Move to the runtime
576         tokio::spawn(async move {
577             let (tx1, rx1) = std::sync::mpsc::channel();
578             let (tx2, rx2) = std::sync::mpsc::channel();
579 
580             // First, we need to block the other worker until all tasks have
581             // been spawned.
582             tokio::spawn(async move {
583                 tx1.send(()).unwrap();
584                 rx2.recv().unwrap();
585             });
586 
587             // Bump the next-run spawn
588             tokio::spawn(async {});
589 
590             rx1.recv().unwrap();
591 
592             // Spawn some tasks
593             for _ in 0..100 {
594                 tokio::spawn(async {});
595             }
596 
597             let n: usize = (0..metrics.num_workers())
598                 .map(|i| metrics.worker_local_queue_depth(i))
599                 .sum();
600 
601             assert_eq!(n, N);
602 
603             tx2.send(()).unwrap();
604         })
605         .await
606         .unwrap();
607     });
608 }
609 
610 #[test]
budget_exhaustion_yield()611 fn budget_exhaustion_yield() {
612     let rt = current_thread();
613     let metrics = rt.metrics();
614 
615     assert_eq!(0, metrics.budget_forced_yield_count());
616 
617     let mut did_yield = false;
618 
619     // block on a task which consumes budget until it yields
620     rt.block_on(poll_fn(|cx| loop {
621         if did_yield {
622             return Poll::Ready(());
623         }
624 
625         let fut = consume_budget();
626         tokio::pin!(fut);
627 
628         if fut.poll(cx).is_pending() {
629             did_yield = true;
630             return Poll::Pending;
631         }
632     }));
633 
634     assert_eq!(1, rt.metrics().budget_forced_yield_count());
635 }
636 
637 #[test]
budget_exhaustion_yield_with_joins()638 fn budget_exhaustion_yield_with_joins() {
639     let rt = current_thread();
640     let metrics = rt.metrics();
641 
642     assert_eq!(0, metrics.budget_forced_yield_count());
643 
644     let mut did_yield_1 = false;
645     let mut did_yield_2 = false;
646 
647     // block on a task which consumes budget until it yields
648     rt.block_on(async {
649         tokio::join!(
650             poll_fn(|cx| loop {
651                 if did_yield_1 {
652                     return Poll::Ready(());
653                 }
654 
655                 let fut = consume_budget();
656                 tokio::pin!(fut);
657 
658                 if fut.poll(cx).is_pending() {
659                     did_yield_1 = true;
660                     return Poll::Pending;
661                 }
662             }),
663             poll_fn(|cx| loop {
664                 if did_yield_2 {
665                     return Poll::Ready(());
666                 }
667 
668                 let fut = consume_budget();
669                 tokio::pin!(fut);
670 
671                 if fut.poll(cx).is_pending() {
672                     did_yield_2 = true;
673                     return Poll::Pending;
674                 }
675             })
676         )
677     });
678 
679     assert_eq!(1, rt.metrics().budget_forced_yield_count());
680 }
681 
682 #[cfg(any(target_os = "linux", target_os = "macos"))]
683 #[test]
io_driver_fd_count()684 fn io_driver_fd_count() {
685     let rt = current_thread();
686     let metrics = rt.metrics();
687 
688     assert_eq!(metrics.io_driver_fd_registered_count(), 0);
689 
690     let stream = tokio::net::TcpStream::connect("google.com:80");
691     let stream = rt.block_on(async move { stream.await.unwrap() });
692 
693     assert_eq!(metrics.io_driver_fd_registered_count(), 1);
694     assert_eq!(metrics.io_driver_fd_deregistered_count(), 0);
695 
696     drop(stream);
697 
698     assert_eq!(metrics.io_driver_fd_deregistered_count(), 1);
699     assert_eq!(metrics.io_driver_fd_registered_count(), 1);
700 }
701 
702 #[cfg(any(target_os = "linux", target_os = "macos"))]
703 #[test]
io_driver_ready_count()704 fn io_driver_ready_count() {
705     let rt = current_thread();
706     let metrics = rt.metrics();
707 
708     let stream = tokio::net::TcpStream::connect("google.com:80");
709     let _stream = rt.block_on(async move { stream.await.unwrap() });
710 
711     assert_eq!(metrics.io_driver_ready_count(), 1);
712 }
713 
current_thread() -> Runtime714 fn current_thread() -> Runtime {
715     tokio::runtime::Builder::new_current_thread()
716         .enable_all()
717         .build()
718         .unwrap()
719 }
720 
threaded() -> Runtime721 fn threaded() -> Runtime {
722     tokio::runtime::Builder::new_multi_thread()
723         .worker_threads(2)
724         .enable_all()
725         .build()
726         .unwrap()
727 }
728 
us(n: u64) -> Duration729 fn us(n: u64) -> Duration {
730     Duration::from_micros(n)
731 }
732