• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 mod yield_now;
2 
3 use crate::loom::sync::atomic::AtomicUsize;
4 use crate::loom::sync::Arc;
5 use crate::loom::thread;
6 use crate::runtime::{Builder, Runtime};
7 use crate::sync::oneshot::{self, Receiver};
8 use crate::task;
9 use std::future::Future;
10 use std::pin::Pin;
11 use std::sync::atomic::Ordering::{Acquire, Release};
12 use std::task::{Context, Poll};
13 
assert_at_most_num_polls(rt: Arc<Runtime>, at_most_polls: usize)14 fn assert_at_most_num_polls(rt: Arc<Runtime>, at_most_polls: usize) {
15     let (tx, rx) = oneshot::channel();
16     let num_polls = Arc::new(AtomicUsize::new(0));
17     rt.spawn(async move {
18         for _ in 0..12 {
19             task::yield_now().await;
20         }
21         tx.send(()).unwrap();
22     });
23 
24     rt.block_on(async {
25         BlockedFuture {
26             rx,
27             num_polls: num_polls.clone(),
28         }
29         .await;
30     });
31 
32     let polls = num_polls.load(Acquire);
33     assert!(polls <= at_most_polls);
34 }
35 
36 #[test]
block_on_num_polls()37 fn block_on_num_polls() {
38     loom::model(|| {
39         // we expect at most 4 number of polls because there are three points at
40         // which we poll the future and an opportunity for a false-positive.. At
41         // any of these points it can be ready:
42         //
43         // - when we fail to steal the parker and we block on a notification
44         //   that it is available.
45         //
46         // - when we steal the parker and we schedule the future
47         //
48         // - when the future is woken up and we have ran the max number of tasks
49         //   for the current tick or there are no more tasks to run.
50         //
51         // - a thread is notified that the parker is available but a third
52         //   thread acquires it before the notified thread can.
53         //
54         let at_most = 4;
55 
56         let rt1 = Arc::new(Builder::new_current_thread().build().unwrap());
57         let rt2 = rt1.clone();
58         let rt3 = rt1.clone();
59 
60         let th1 = thread::spawn(move || assert_at_most_num_polls(rt1, at_most));
61         let th2 = thread::spawn(move || assert_at_most_num_polls(rt2, at_most));
62         let th3 = thread::spawn(move || assert_at_most_num_polls(rt3, at_most));
63 
64         th1.join().unwrap();
65         th2.join().unwrap();
66         th3.join().unwrap();
67     });
68 }
69 
70 #[test]
assert_no_unnecessary_polls()71 fn assert_no_unnecessary_polls() {
72     loom::model(|| {
73         // // After we poll outer future, woken should reset to false
74         let rt = Builder::new_current_thread().build().unwrap();
75         let (tx, rx) = oneshot::channel();
76         let pending_cnt = Arc::new(AtomicUsize::new(0));
77 
78         rt.spawn(async move {
79             for _ in 0..24 {
80                 task::yield_now().await;
81             }
82             tx.send(()).unwrap();
83         });
84 
85         let pending_cnt_clone = pending_cnt.clone();
86         rt.block_on(async move {
87             // use task::yield_now() to ensure woken set to true
88             // ResetFuture will be polled at most once
89             // Here comes two cases
90             // 1. recv no message from channel, ResetFuture will be polled
91             //    but get Pending and we record ResetFuture.pending_cnt ++.
92             //    Then when message arrive, ResetFuture returns Ready. So we
93             //    expect ResetFuture.pending_cnt = 1
94             // 2. recv message from channel, ResetFuture returns Ready immediately.
95             //    We expect ResetFuture.pending_cnt = 0
96             task::yield_now().await;
97             ResetFuture {
98                 rx,
99                 pending_cnt: pending_cnt_clone,
100             }
101             .await;
102         });
103 
104         let pending_cnt = pending_cnt.load(Acquire);
105         assert!(pending_cnt <= 1);
106     });
107 }
108 
109 struct BlockedFuture {
110     rx: Receiver<()>,
111     num_polls: Arc<AtomicUsize>,
112 }
113 
114 impl Future for BlockedFuture {
115     type Output = ();
116 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>117     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
118         self.num_polls.fetch_add(1, Release);
119 
120         match Pin::new(&mut self.rx).poll(cx) {
121             Poll::Pending => Poll::Pending,
122             _ => Poll::Ready(()),
123         }
124     }
125 }
126 
127 struct ResetFuture {
128     rx: Receiver<()>,
129     pending_cnt: Arc<AtomicUsize>,
130 }
131 
132 impl Future for ResetFuture {
133     type Output = ();
134 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>135     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
136         match Pin::new(&mut self.rx).poll(cx) {
137             Poll::Pending => {
138                 self.pending_cnt.fetch_add(1, Release);
139                 Poll::Pending
140             }
141             _ => Poll::Ready(()),
142         }
143     }
144 }
145