• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::sync::oneshot;
2 
3 use futures::future::poll_fn;
4 use loom::future::block_on;
5 use loom::thread;
6 use std::task::Poll::{Pending, Ready};
7 
8 #[test]
smoke()9 fn smoke() {
10     loom::model(|| {
11         let (tx, rx) = oneshot::channel();
12 
13         thread::spawn(move || {
14             tx.send(1).unwrap();
15         });
16 
17         let value = block_on(rx).unwrap();
18         assert_eq!(1, value);
19     });
20 }
21 
22 #[test]
changing_rx_task()23 fn changing_rx_task() {
24     loom::model(|| {
25         let (tx, mut rx) = oneshot::channel();
26 
27         thread::spawn(move || {
28             tx.send(1).unwrap();
29         });
30 
31         let rx = thread::spawn(move || {
32             let ready = block_on(poll_fn(|cx| match Pin::new(&mut rx).poll(cx) {
33                 Ready(Ok(value)) => {
34                     assert_eq!(1, value);
35                     Ready(true)
36                 }
37                 Ready(Err(_)) => unimplemented!(),
38                 Pending => Ready(false),
39             }));
40 
41             if ready {
42                 None
43             } else {
44                 Some(rx)
45             }
46         })
47         .join()
48         .unwrap();
49 
50         if let Some(rx) = rx {
51             // Previous task parked, use a new task...
52             let value = block_on(rx).unwrap();
53             assert_eq!(1, value);
54         }
55     });
56 }
57 
58 // TODO: Move this into `oneshot` proper.
59 
60 use std::future::Future;
61 use std::pin::Pin;
62 use std::task::{Context, Poll};
63 
64 struct OnClose<'a> {
65     tx: &'a mut oneshot::Sender<i32>,
66 }
67 
68 impl<'a> OnClose<'a> {
new(tx: &'a mut oneshot::Sender<i32>) -> Self69     fn new(tx: &'a mut oneshot::Sender<i32>) -> Self {
70         OnClose { tx }
71     }
72 }
73 
74 impl Future for OnClose<'_> {
75     type Output = bool;
76 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool>77     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool> {
78         let fut = self.get_mut().tx.closed();
79         crate::pin!(fut);
80 
81         Ready(fut.poll(cx).is_ready())
82     }
83 }
84 
85 #[test]
changing_tx_task()86 fn changing_tx_task() {
87     loom::model(|| {
88         let (mut tx, rx) = oneshot::channel::<i32>();
89 
90         thread::spawn(move || {
91             drop(rx);
92         });
93 
94         let tx = thread::spawn(move || {
95             let t1 = block_on(OnClose::new(&mut tx));
96 
97             if t1 {
98                 None
99             } else {
100                 Some(tx)
101             }
102         })
103         .join()
104         .unwrap();
105 
106         if let Some(mut tx) = tx {
107             // Previous task parked, use a new task...
108             block_on(OnClose::new(&mut tx));
109         }
110     });
111 }
112