1 mod yield_now;
2
3 use crate::loom::sync::atomic::AtomicUsize;
4 use crate::loom::sync::Arc;
5 use crate::loom::thread;
6 use crate::runtime::{Builder, Runtime};
7 use crate::sync::oneshot::{self, Receiver};
8 use crate::task;
9 use std::future::Future;
10 use std::pin::Pin;
11 use std::sync::atomic::Ordering::{Acquire, Release};
12 use std::task::{Context, Poll};
13
assert_at_most_num_polls(rt: Arc<Runtime>, at_most_polls: usize)14 fn assert_at_most_num_polls(rt: Arc<Runtime>, at_most_polls: usize) {
15 let (tx, rx) = oneshot::channel();
16 let num_polls = Arc::new(AtomicUsize::new(0));
17 rt.spawn(async move {
18 for _ in 0..12 {
19 task::yield_now().await;
20 }
21 tx.send(()).unwrap();
22 });
23
24 rt.block_on(async {
25 BlockedFuture {
26 rx,
27 num_polls: num_polls.clone(),
28 }
29 .await;
30 });
31
32 let polls = num_polls.load(Acquire);
33 assert!(polls <= at_most_polls);
34 }
35
36 #[test]
block_on_num_polls()37 fn block_on_num_polls() {
38 loom::model(|| {
39 // we expect at most 4 number of polls because there are three points at
40 // which we poll the future and an opportunity for a false-positive.. At
41 // any of these points it can be ready:
42 //
43 // - when we fail to steal the parker and we block on a notification
44 // that it is available.
45 //
46 // - when we steal the parker and we schedule the future
47 //
48 // - when the future is woken up and we have ran the max number of tasks
49 // for the current tick or there are no more tasks to run.
50 //
51 // - a thread is notified that the parker is available but a third
52 // thread acquires it before the notified thread can.
53 //
54 let at_most = 4;
55
56 let rt1 = Arc::new(Builder::new_current_thread().build().unwrap());
57 let rt2 = rt1.clone();
58 let rt3 = rt1.clone();
59
60 let th1 = thread::spawn(move || assert_at_most_num_polls(rt1, at_most));
61 let th2 = thread::spawn(move || assert_at_most_num_polls(rt2, at_most));
62 let th3 = thread::spawn(move || assert_at_most_num_polls(rt3, at_most));
63
64 th1.join().unwrap();
65 th2.join().unwrap();
66 th3.join().unwrap();
67 });
68 }
69
70 #[test]
assert_no_unnecessary_polls()71 fn assert_no_unnecessary_polls() {
72 loom::model(|| {
73 // // After we poll outer future, woken should reset to false
74 let rt = Builder::new_current_thread().build().unwrap();
75 let (tx, rx) = oneshot::channel();
76 let pending_cnt = Arc::new(AtomicUsize::new(0));
77
78 rt.spawn(async move {
79 for _ in 0..24 {
80 task::yield_now().await;
81 }
82 tx.send(()).unwrap();
83 });
84
85 let pending_cnt_clone = pending_cnt.clone();
86 rt.block_on(async move {
87 // use task::yield_now() to ensure woken set to true
88 // ResetFuture will be polled at most once
89 // Here comes two cases
90 // 1. recv no message from channel, ResetFuture will be polled
91 // but get Pending and we record ResetFuture.pending_cnt ++.
92 // Then when message arrive, ResetFuture returns Ready. So we
93 // expect ResetFuture.pending_cnt = 1
94 // 2. recv message from channel, ResetFuture returns Ready immediately.
95 // We expect ResetFuture.pending_cnt = 0
96 task::yield_now().await;
97 ResetFuture {
98 rx,
99 pending_cnt: pending_cnt_clone,
100 }
101 .await;
102 });
103
104 let pending_cnt = pending_cnt.load(Acquire);
105 assert!(pending_cnt <= 1);
106 });
107 }
108
109 struct BlockedFuture {
110 rx: Receiver<()>,
111 num_polls: Arc<AtomicUsize>,
112 }
113
114 impl Future for BlockedFuture {
115 type Output = ();
116
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>117 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
118 self.num_polls.fetch_add(1, Release);
119
120 match Pin::new(&mut self.rx).poll(cx) {
121 Poll::Pending => Poll::Pending,
122 _ => Poll::Ready(()),
123 }
124 }
125 }
126
127 struct ResetFuture {
128 rx: Receiver<()>,
129 pending_cnt: Arc<AtomicUsize>,
130 }
131
132 impl Future for ResetFuture {
133 type Output = ();
134
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>135 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
136 match Pin::new(&mut self.rx).poll(cx) {
137 Poll::Pending => {
138 self.pending_cnt.fetch_add(1, Release);
139 Poll::Pending
140 }
141 _ => Poll::Ready(()),
142 }
143 }
144 }
145