• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::loom::sync::atomic::AtomicUsize;
2 use crate::loom::sync::Arc;
3 use crate::loom::thread;
4 use crate::runtime::{Builder, Runtime};
5 use crate::sync::oneshot::{self, Receiver};
6 use crate::task;
7 use std::future::Future;
8 use std::pin::Pin;
9 use std::sync::atomic::Ordering::{Acquire, Release};
10 use std::task::{Context, Poll};
11 
assert_at_most_num_polls(rt: Arc<Runtime>, at_most_polls: usize)12 fn assert_at_most_num_polls(rt: Arc<Runtime>, at_most_polls: usize) {
13     let (tx, rx) = oneshot::channel();
14     let num_polls = Arc::new(AtomicUsize::new(0));
15     rt.spawn(async move {
16         for _ in 0..12 {
17             task::yield_now().await;
18         }
19         tx.send(()).unwrap();
20     });
21 
22     rt.block_on(async {
23         BlockedFuture {
24             rx,
25             num_polls: num_polls.clone(),
26         }
27         .await;
28     });
29 
30     let polls = num_polls.load(Acquire);
31     assert!(polls <= at_most_polls);
32 }
33 
34 #[test]
block_on_num_polls()35 fn block_on_num_polls() {
36     loom::model(|| {
37         // we expect at most 3 number of polls because there are
38         // three points at which we poll the future. At any of these
39         // points it can be ready:
40         //
41         // - when we fail to steal the parker and we block on a
42         //   notification that it is available.
43         //
44         // - when we steal the parker and we schedule the future
45         //
46         // - when the future is woken up and we have ran the max
47         //   number of tasks for the current tick or there are no
48         //   more tasks to run.
49         //
50         let at_most = 3;
51 
52         let rt1 = Arc::new(Builder::new_current_thread().build().unwrap());
53         let rt2 = rt1.clone();
54         let rt3 = rt1.clone();
55 
56         let th1 = thread::spawn(move || assert_at_most_num_polls(rt1, at_most));
57         let th2 = thread::spawn(move || assert_at_most_num_polls(rt2, at_most));
58         let th3 = thread::spawn(move || assert_at_most_num_polls(rt3, at_most));
59 
60         th1.join().unwrap();
61         th2.join().unwrap();
62         th3.join().unwrap();
63     });
64 }
65 
66 struct BlockedFuture {
67     rx: Receiver<()>,
68     num_polls: Arc<AtomicUsize>,
69 }
70 
71 impl Future for BlockedFuture {
72     type Output = ();
73 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>74     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
75         self.num_polls.fetch_add(1, Release);
76 
77         match Pin::new(&mut self.rx).poll(cx) {
78             Poll::Pending => Poll::Pending,
79             _ => Poll::Ready(()),
80         }
81     }
82 }
83