• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::sync::Notify;
2 
3 use loom::future::block_on;
4 use loom::sync::Arc;
5 use loom::thread;
6 
7 use tokio_test::{assert_pending, assert_ready};
8 
9 /// `util::wake_list::NUM_WAKERS`
10 const WAKE_LIST_SIZE: usize = 32;
11 
12 #[test]
notify_one()13 fn notify_one() {
14     loom::model(|| {
15         let tx = Arc::new(Notify::new());
16         let rx = tx.clone();
17 
18         let th = thread::spawn(move || {
19             block_on(async {
20                 rx.notified().await;
21             });
22         });
23 
24         tx.notify_one();
25         th.join().unwrap();
26     });
27 }
28 
29 #[test]
notify_waiters()30 fn notify_waiters() {
31     loom::model(|| {
32         let notify = Arc::new(Notify::new());
33         let tx = notify.clone();
34         let notified1 = notify.notified();
35         let notified2 = notify.notified();
36 
37         let th = thread::spawn(move || {
38             tx.notify_waiters();
39         });
40 
41         block_on(async {
42             notified1.await;
43             notified2.await;
44         });
45 
46         th.join().unwrap();
47     });
48 }
49 
50 #[test]
notify_waiters_and_one()51 fn notify_waiters_and_one() {
52     loom::model(|| {
53         let notify = Arc::new(Notify::new());
54         let tx1 = notify.clone();
55         let tx2 = notify.clone();
56 
57         let th1 = thread::spawn(move || {
58             tx1.notify_waiters();
59         });
60 
61         let th2 = thread::spawn(move || {
62             tx2.notify_one();
63         });
64 
65         let th3 = thread::spawn(move || {
66             let notified = notify.notified();
67 
68             block_on(async {
69                 notified.await;
70             });
71         });
72 
73         th1.join().unwrap();
74         th2.join().unwrap();
75         th3.join().unwrap();
76     });
77 }
78 
79 #[test]
notify_multi()80 fn notify_multi() {
81     loom::model(|| {
82         let notify = Arc::new(Notify::new());
83 
84         let mut ths = vec![];
85 
86         for _ in 0..2 {
87             let notify = notify.clone();
88 
89             ths.push(thread::spawn(move || {
90                 block_on(async {
91                     notify.notified().await;
92                     notify.notify_one();
93                 })
94             }));
95         }
96 
97         notify.notify_one();
98 
99         for th in ths.drain(..) {
100             th.join().unwrap();
101         }
102 
103         block_on(async {
104             notify.notified().await;
105         });
106     });
107 }
108 
109 #[test]
notify_drop()110 fn notify_drop() {
111     use crate::future::poll_fn;
112     use std::future::Future;
113     use std::task::Poll;
114 
115     loom::model(|| {
116         let notify = Arc::new(Notify::new());
117         let rx1 = notify.clone();
118         let rx2 = notify.clone();
119 
120         let th1 = thread::spawn(move || {
121             let mut recv = Box::pin(rx1.notified());
122 
123             block_on(poll_fn(|cx| {
124                 if recv.as_mut().poll(cx).is_ready() {
125                     rx1.notify_one();
126                 }
127                 Poll::Ready(())
128             }));
129         });
130 
131         let th2 = thread::spawn(move || {
132             block_on(async {
133                 rx2.notified().await;
134                 // Trigger second notification
135                 rx2.notify_one();
136                 rx2.notified().await;
137             });
138         });
139 
140         notify.notify_one();
141 
142         th1.join().unwrap();
143         th2.join().unwrap();
144     });
145 }
146 
147 /// Polls two `Notified` futures and checks if poll results are consistent
148 /// with each other. If the first future is notified by a `notify_waiters`
149 /// call, then the second one must be notified as well.
150 #[test]
notify_waiters_poll_consistency()151 fn notify_waiters_poll_consistency() {
152     fn notify_waiters_poll_consistency_variant(poll_setting: [bool; 2]) {
153         let notify = Arc::new(Notify::new());
154         let mut notified = [
155             tokio_test::task::spawn(notify.notified()),
156             tokio_test::task::spawn(notify.notified()),
157         ];
158         for i in 0..2 {
159             if poll_setting[i] {
160                 assert_pending!(notified[i].poll());
161             }
162         }
163 
164         let tx = notify.clone();
165         let th = thread::spawn(move || {
166             tx.notify_waiters();
167         });
168 
169         let res1 = notified[0].poll();
170         let res2 = notified[1].poll();
171 
172         // If res1 is ready, then res2 must also be ready.
173         assert!(res1.is_pending() || res2.is_ready());
174 
175         th.join().unwrap();
176     }
177 
178     // We test different scenarios in which pending futures had or had not
179     // been polled before the call to `notify_waiters`.
180     loom::model(|| notify_waiters_poll_consistency_variant([false, false]));
181     loom::model(|| notify_waiters_poll_consistency_variant([true, false]));
182     loom::model(|| notify_waiters_poll_consistency_variant([false, true]));
183     loom::model(|| notify_waiters_poll_consistency_variant([true, true]));
184 }
185 
186 /// Polls two `Notified` futures and checks if poll results are consistent
187 /// with each other. If the first future is notified by a `notify_waiters`
188 /// call, then the second one must be notified as well.
189 ///
190 /// Here we also add other `Notified` futures in between to force the two
191 /// tested futures to end up in different chunks.
192 #[test]
notify_waiters_poll_consistency_many()193 fn notify_waiters_poll_consistency_many() {
194     fn notify_waiters_poll_consistency_many_variant(order: [usize; 2]) {
195         let notify = Arc::new(Notify::new());
196 
197         let mut futs = (0..WAKE_LIST_SIZE + 1)
198             .map(|_| tokio_test::task::spawn(notify.notified()))
199             .collect::<Vec<_>>();
200 
201         assert_pending!(futs[order[0]].poll());
202         for i in 2..futs.len() {
203             assert_pending!(futs[i].poll());
204         }
205         assert_pending!(futs[order[1]].poll());
206 
207         let tx = notify.clone();
208         let th = thread::spawn(move || {
209             tx.notify_waiters();
210         });
211 
212         let res1 = futs[0].poll();
213         let res2 = futs[1].poll();
214 
215         // If res1 is ready, then res2 must also be ready.
216         assert!(res1.is_pending() || res2.is_ready());
217 
218         th.join().unwrap();
219     }
220 
221     // We test different scenarios in which futures are polled in different order.
222     loom::model(|| notify_waiters_poll_consistency_many_variant([0, 1]));
223     loom::model(|| notify_waiters_poll_consistency_many_variant([1, 0]));
224 }
225 
226 /// Checks if a call to `notify_waiters` is observed as atomic when combined
227 /// with a concurrent call to `notify_one`.
228 #[test]
notify_waiters_is_atomic()229 fn notify_waiters_is_atomic() {
230     fn notify_waiters_is_atomic_variant(tested_fut_index: usize) {
231         let notify = Arc::new(Notify::new());
232 
233         let mut futs = (0..WAKE_LIST_SIZE + 1)
234             .map(|_| tokio_test::task::spawn(notify.notified()))
235             .collect::<Vec<_>>();
236 
237         for fut in &mut futs {
238             assert_pending!(fut.poll());
239         }
240 
241         let tx = notify.clone();
242         let th = thread::spawn(move || {
243             tx.notify_waiters();
244         });
245 
246         block_on(async {
247             // If awaiting one of the futures completes, then we should be
248             // able to assume that all pending futures are notified. Therefore
249             // a notification from a subsequent `notify_one` call should not
250             // be consumed by an old future.
251             futs.remove(tested_fut_index).await;
252 
253             let mut new_fut = tokio_test::task::spawn(notify.notified());
254             assert_pending!(new_fut.poll());
255 
256             notify.notify_one();
257 
258             // `new_fut` must consume the notification from `notify_one`.
259             assert_ready!(new_fut.poll());
260         });
261 
262         th.join().unwrap();
263     }
264 
265     // We test different scenarios in which the tested future is at the beginning
266     // or at the end of the waiters queue used by `Notify`.
267     loom::model(|| notify_waiters_is_atomic_variant(0));
268     loom::model(|| notify_waiters_is_atomic_variant(32));
269 }
270 
271 /// Checks if a single call to `notify_waiters` does not get through two `Notified`
272 /// futures created and awaited sequentially like this:
273 /// ```ignore
274 /// notify.notified().await;
275 /// notify.notified().await;
276 /// ```
277 #[test]
notify_waiters_sequential_notified_await()278 fn notify_waiters_sequential_notified_await() {
279     use crate::sync::oneshot;
280 
281     loom::model(|| {
282         let notify = Arc::new(Notify::new());
283 
284         let (tx_fst, rx_fst) = oneshot::channel();
285         let (tx_snd, rx_snd) = oneshot::channel();
286 
287         let receiver = thread::spawn({
288             let notify = notify.clone();
289             move || {
290                 block_on(async {
291                     // Poll the first `Notified` to put it as the first waiter
292                     // in the queue.
293                     let mut first_notified = tokio_test::task::spawn(notify.notified());
294                     assert_pending!(first_notified.poll());
295 
296                     // Create additional waiters to force `notify_waiters` to
297                     // release the lock at least once.
298                     let _task_pile = (0..WAKE_LIST_SIZE + 1)
299                         .map(|_| {
300                             let mut fut = tokio_test::task::spawn(notify.notified());
301                             assert_pending!(fut.poll());
302                             fut
303                         })
304                         .collect::<Vec<_>>();
305 
306                     // We are ready for the notify_waiters call.
307                     tx_fst.send(()).unwrap();
308 
309                     first_notified.await;
310 
311                     // Poll the second `Notified` future to try to insert
312                     // it to the waiters queue.
313                     let mut second_notified = tokio_test::task::spawn(notify.notified());
314                     assert_pending!(second_notified.poll());
315 
316                     // Wait for the `notify_waiters` to end and check if we
317                     // are woken up.
318                     rx_snd.await.unwrap();
319                     assert_pending!(second_notified.poll());
320                 });
321             }
322         });
323 
324         // Wait for the signal and call `notify_waiters`.
325         block_on(rx_fst).unwrap();
326         notify.notify_waiters();
327         tx_snd.send(()).unwrap();
328 
329         receiver.join().unwrap();
330     });
331 }
332